<?php

use Swoole\Coroutine;

/*
 * 解析配置文件
 */
$config = parse_ini_file("./config.ini");
if (empty($config)) {
    die("配置文件错误");
}

/*
 * 校验port
 */
$portStr = $config['cmpp_port'] ?? '';
if (empty($portStr)) {
    die("端口错误");
} else {
    $portArr = explode(",", $portStr);
}

/*
 * 校验report
 */
$confReportStr = $config['report'] ?? '';
$confReport2rate = [];
if (!empty($confReportStr)) {
    $confReportArrs = explode(",", $confReportStr);
    if (empty($confReportArrs)) {
        die("report格式错误(,)");
    }

    $faileReportRateSum = 0;
    foreach ($confReportArrs as $confReportArr) {
        $confReportKV = explode("=>", $confReportArr);
        if (empty($confReportKV) || count($confReportKV) != 2) {
            die("report格式错误(=>)");
        }

        $confReport2rate[$confReportKV[0]] = $confReportKV[1];
        $faileReportRateSum += $confReportKV[1];

        if (strlen($confReportKV[0]) != 7) {
            die("report格式错误(状态码必须长7位) " . $confReportKV[0]);
        }

        if ($faileReportRateSum > 100) {
            die("report格式错误(大于100%)");
        }
    }
} else {
    die("report不能为空");
}

/*
 * 校验report延迟
 */
$confReportDelayStr = $config['report_delay'] ?? '';
$confReportDelay2rate = [];
if (!empty($confReportDelayStr)) {
    $confReportDelayArrs = explode(",", $confReportDelayStr);
    if (empty($confReportDelayArrs)) {
        die("report delay格式错误(,)");
    }

    $rateSum = 0;

    foreach ($confReportDelayArrs as $delayArr) {
        $delayKV = explode("=>", $delayArr);
        if (empty($delayKV) || count($delayKV) != 2) {
            die("report delay格式错误(=>)");
        }

        $confReportDelay2rate[$delayKV[0]] = $delayKV[1];
        $rateSum += $delayKV[1];

        $tmp = explode("-", $delayKV[0]);
        if (empty($tmp) || count($tmp) != 2) {
            die("report delay格式错误 " . $delayKV[0]);
        }
    }

    if ($rateSum != 100) {
        die("report delay格式错误(必须等于100%)");
    }
} else {
    die("report delay不能为空");
}

/*
 * 校验deliver_rate
 */
$confDeliverRate = $config['deliver_rate'] ?? '';
if (!empty($confDeliverRate)) {
    $confDeliverRate = floatval($confDeliverRate);
    if ($confDeliverRate > 100) {
        die("deliver_rate格式错误");
    }
}

/*
 * 校验deliver content
 */
$confDeliverStr = $config['deliver_content'] ?? '';
$confDeliver2rate = [];
if (!empty($confDeliverStr)) {
    $confDeliverArrs = explode(",", $confDeliverStr);
    if (empty($confDeliverArrs)) {
        die("deliver content格式错误(,)");
    }

    $RateSum = 0;
    foreach ($confDeliverArrs as $confDeliverArr) {
        $confKV = explode("=>", $confDeliverArr);
        if (empty($confKV) || count($confKV) != 2) {
            die("deliver content格式错误(=>)");
        }

        $confDeliver2rate[$confKV[0]] = $confKV[1];
        $RateSum += $confKV[1];
    }
    if ($RateSum != 100) {
        die("deliver content格式错误(必须等于100%)");
    }
}

/*
 * 校验deliver delay延迟
 */
$confDeliverDelayStr = $config['deliver_delay'] ?? '';
$confDeliverDelay2rate = [];
if (!empty($confDeliverDelayStr)) {
    $confDeliverDelayArrs = explode(",", $confDeliverDelayStr);
    if (empty($confDeliverDelayArrs)) {
        die("deliver delay格式错误(,)");
    }

    $rateSum = 0;

    foreach ($confDeliverDelayArrs as $delayArr) {
        $delayKV = explode("=>", $delayArr);
        if (empty($delayKV) || count($delayKV) != 2) {
            die("deliver delay格式错误(=>)");
        }

        $confDeliverDelay2rate[$delayKV[0]] = $delayKV[1];
        $rateSum += $delayKV[1];

        $tmp = explode("-", $delayKV[0]);
        if (empty($tmp) || count($tmp) != 2) {
            die("deliver delay格式错误 " . $delayKV[0]);
        }
    }

    if ($rateSum != 100) {
        die("deliver delay格式错误(必须等于100%)");
    }
}

/*
 * 核心类
 */

class Cmpp2
{

    const CMPP_CONNECT = 0x00000001;
    const CMPP_CONNECT_RESP = 0x80000001;
    const CMPP_TERMINATE = 0x00000002;
    const CMPP_TERMINATE_RESP = 0x80000002;
    const CMPP_SUBMIT = 0x00000004;
    const CMPP_SUBMIT_RESP = 0x80000004;
    const CMPP_DELIVER = 0x00000005;
    const CMPP_DELIVER_RESP = 0x80000005;
    const CMPP_QUERY = 0x00000006;
    const CMPP_QUERY_RESP = 0x80000006;
    const CMPP_CANCEL = 0x00000007;
    const CMPP_CANCEL_RESP = 0x80000007;
    const CMPP_ACTIVE_TEST = 0x00000008;
    const CMPP_ACTIVE_TEST_RESP = 0x80000008;
    const STATUS_SUCCESS = 0;
    const STATUS_STRUCTURE_ERROR = 1;
    //connect
    const STATUS_ILLEGAL_ADDR = 2;
    const STATUS_AUTH_ERROR = 3;
    const STATUS_VERSION_ERROR = 4;
    const STATUS_EXCEED_CON_NUM = 16;
    const STATUS_NOT_CMPP2_USER = 18;
    const STATUS_NO_USER = 19;
    const STATUS_PRICE_ERROR = 20;
    //submit
    const STATUS_INVALID_NUM = 100;
    const STATUS_EXCEED_LIMIT = 8;
    const STATUS_NOT_LOGIN = 102;
    const STATUS_INS_BALANCE = 103;
    const STATUS_PRODUCT_LOCKED = 101;
    const STATUS_PRODUCT_TYPE_LOCKED = 105;

    protected $packRule = 'cleftProtocolLength/cflagLength/cleftSms/cuniqueSmsFlag/csmsTotal/csmsIndex';
    public $headerPackRule = 'NTotal_Length/NCommand_Id/NSequence_Id'; //头部解析规则
    public $bodyPackRule = [//协议体解析规则
        self::CMPP_CONNECT => 'a6Source_Addr/a16AuthenticatorSource/CVersion/NTimestamp',
        self::CMPP_CONNECT_RESP => '',
        self::CMPP_TERMINATE => '',
        self::CMPP_TERMINATE_RESP => '',
        self::CMPP_SUBMIT => 'a8Msg_Id/CPk_total/CPk_number/CRegistered_Delivery/CMsg_level/a10Service_Id/CFee_UserType/a21Fee_terminal_Id/CTP_pId/CTP_udhi/CMsg_Fmt/a6Msg_src/a2FeeType/a6FeeCode/a17ValId_Time/a17At_Time/a21Src_Id/CDestUsr_tl/a21Dest_terminal_Id/CMsg_Length/',
        self::CMPP_SUBMIT_RESP => '',
        self::CMPP_DELIVER => 'a8Msg_Id/',
        self::CMPP_DELIVER_RESP => 'JMsg_Id/CResult',
        self::CMPP_QUERY_RESP => '',
        self::CMPP_QUERY => '',
        self::CMPP_CANCEL => '',
        self::CMPP_CANCEL_RESP => '',
        self::CMPP_ACTIVE_TEST => '',
        self::CMPP_ACTIVE_TEST_RESP => 'CReserved',
    ];
    public $bodyLen = [//协议体长度
        self::CMPP_CONNECT => 27, //6 16 1 4
        self::CMPP_CONNECT_RESP => 18, // 1 16 1
        self::CMPP_TERMINATE => 0,
        self::CMPP_TERMINATE_RESP => 0,
        self::CMPP_SUBMIT => 147, //8 1 1 1 1 10 1 21 1 1 1 6 2 6 17 17 21 1 21 1 8 ！！未包含短信长度，因为该长度可变
        self::CMPP_SUBMIT_RESP => 9, //8 1
        self::CMPP_DELIVER => 0,
        self::CMPP_DELIVER_RESP => 9, //8 1
        self::CMPP_QUERY_RESP => 0,
        self::CMPP_QUERY => 0,
        self::CMPP_CANCEL => 0,
        self::CMPP_CANCEL_RESP => 0,
        self::CMPP_ACTIVE_TEST => 0,
        self::CMPP_ACTIVE_TEST_RESP => 1, //1
    ];
    public $errorList = [
        self::CMPP_CONNECT => [
            self::STATUS_STRUCTURE_ERROR => '消息结构错',
            self::STATUS_ILLEGAL_ADDR => '非法源地址',
            self::STATUS_AUTH_ERROR => '认证错',
            self::STATUS_VERSION_ERROR => '版本太高',
            self::STATUS_EXCEED_CON_NUM => '用户的连接数超出限制',
            self::STATUS_NOT_CMPP2_USER => '该用户配置的协议为HTTP,不支持使用CMPP协议登录',
            self::STATUS_NO_USER => '未获取到用户信息',
            self::STATUS_PRICE_ERROR => '单价设置错误',
        ],
        self::CMPP_CONNECT_RESP => [],
        self::CMPP_TERMINATE => [],
        self::CMPP_TERMINATE_RESP => [],
        self::CMPP_SUBMIT => [
            self::STATUS_PRODUCT_LOCKED => '产品被锁定',
            self::STATUS_PRODUCT_TYPE_LOCKED => '产品类型被锁定',
            self::STATUS_INVALID_NUM => '长号码无效',
            self::STATUS_EXCEED_LIMIT => '流量超限',
            self::STATUS_NOT_LOGIN => '用户未登录',
            self::STATUS_INS_BALANCE => '余额不足',
        ],
        self::CMPP_SUBMIT_RESP => [],
        self::CMPP_DELIVER => [],
        self::CMPP_DELIVER_RESP => [],
        self::CMPP_QUERY_RESP => [],
        self::CMPP_QUERY => [],
        self::CMPP_CANCEL => [],
        self::CMPP_CANCEL_RESP => [],
        self::CMPP_ACTIVE_TEST => [],
        self::CMPP_ACTIVE_TEST_RESP => [],
    ];
    public $status = self::STATUS_SUCCESS; //协议状态
    public $error; //协议错误
    public $needCloseFd = false; //是否需要关闭连接
    public $response; //协议响应
    protected $commandId; //协议动作
    protected $headerBinary; //协议头
    protected $bodyBinary; //协议头
    public $headerArr; //解析后的协议头
    protected $bodyArr; //解析后的协议头
    public $msgHexId; //msg id的十六进制字符串表现
    public $msgIdDecArr; //十进制msgid数组
    public static $server = null;
    public static $atomit = null;
    public static $atomitDeliver = null;
    public static $channel_suc_and_fail = null;
    public static $mobile2report = [];
    public static $mobile2reportForUK = [];
    public static $longMsg = null; //长信缓存
    public static $spid2fd = []; //spid到fd的数组
    public static $fd2spid = []; //fd到spid的映射

    public function setBinary(string $binary)
    {
        $this->headerBinary = substr($binary, 0, 12);
        $this->bodyBinary = substr($binary, 12);
    }

    /**
     * parseLongMsgHeader 解析长短信的协议头
     * @param $binary
     */
    public function parseLongMsgHeader($binary)
    {
        $msgHeadArr = @unpack($this->packRule, $binary);

        if ($msgHeadArr === false) {
            throw new Exception('用户提交协议有误');
        }

        $this->smsTotal = $msgHeadArr['smsTotal'];

        if (!is_numeric($this->smsTotal) || $this->smsTotal <= 0) {
            throw new Exception('用户提交协议有误');
        }

        $this->smsIndex = $msgHeadArr['smsIndex'];

        if (!is_numeric($this->smsIndex) || $this->smsIndex <= 0) {
            throw new Exception('用户提交协议有误');
        }

        $this->uniqueSmsFlag = $msgHeadArr['uniqueSmsFlag'];
    }

    /**
     * parseHeader 解析数据头部获取协议动作
     * @return bool
     */
    public function parseHeader()
    {
        $this->headerArr = @unpack($this->headerPackRule, $this->headerBinary);

        $this->commandId = isset($this->headerArr['Command_Id']) ? $this->headerArr['Command_Id'] : null;

        if (empty($this->commandId)) {
            throw new Exception('协议头结构错误');
        }

        return true;
    }

    /**
     * parseBody 解析协议体
     * @return bool
     */
    public function parseBody()
    {
        //协议未设置commandId 或者 commandId不在给定的范围内
        if (!isset($this->commandId) || !isset($this->bodyPackRule[$this->commandId])) {
            throw new Exception('协议头CommandId错误');
        }

        //拆除连接和客户端探活操作无协议体
        if ($this->commandId === self::CMPP_TERMINATE || $this->commandId === self::CMPP_ACTIVE_TEST) {
            return true;
        }

        //是否有解包格式 如果全部完成则该步骤删除
        if ($this->bodyPackRule[$this->commandId] === '') {
            throw new Exception('暂未开放此CommandId');
        }

        switch ($this->commandId) {
            case Cmpp2::CMPP_SUBMIT:
                return $this->parseSubmitBody();
                break;
            default:
                return $this->parseCommonBody();
                break;
        }
    }

    /**
     * parseSubmitBody 解析submit协议体 此协议体是变长
     * @return bool
     */
    public function parseSubmitBody()
    {
        //协议体长度
        $bodyBinaryLen = strlen($this->bodyBinary);

        if ($bodyBinaryLen <= $this->bodyLen[self::CMPP_SUBMIT]) {
            //设置错误码和错误信息
            $this->setErrorStatus(self::STATUS_STRUCTURE_ERROR);

            //设置返回的数据包
            $this->packageStructureErrResp();

            return false;
        }

        //短信长度等于协议提总长-扣除短信的协议提长
        $msgContentLength = $bodyBinaryLen - $this->bodyLen[self::CMPP_SUBMIT];

        //解析协议体
        $this->bodyArr = @unpack($this->bodyPackRule[$this->commandId] . 'a' . $msgContentLength . 'Msg_Content/a8Reserve', $this->bodyBinary);

        if ($this->bodyArr === false) {
            throw new Exception('协议解析错误');
        }

        return true;
    }

    /**
     * parseCommonBody 解析普通协议体
     * @return bool
     */
    public function parseCommonBody()
    {
        //如果协议体长度和规定的不一致则返回
        if ($this->bodyLen[$this->commandId] !== strlen($this->bodyBinary)) {
            //设置错误码和错误信息
            $this->setErrorStatus(self::STATUS_STRUCTURE_ERROR);

            //设置返回的数据包
            $this->packageStructureErrResp();

            return false;
        }

        $this->bodyArr = @unpack($this->bodyPackRule[$this->commandId], $this->bodyBinary);

        if ($this->bodyArr === false) {
            throw new Exception('协议解析错误');
        }

        return true;
    }

    /**
     * getCommandId 获取协议动作
     * @return string
     */
    public function getCommandId()
    {
        return $this->commandId;
    }

    /**
     * getNeedCloseFd
     * @return bool
     */
    public function getNeedCloseFd()
    {
        return $this->needCloseFd;
    }

    /**
     * getResponse 获取响应数据
     * @return string
     */
    public function getResponse()
    {
        return $this->response;
    }

    /**
     * getHeader 获取协议头
     * @param  string  $key
     * @param  string  $default
     * @return array|string
     */
    public function getHeader(string $key = '', string $default = '')
    {
        if (empty($key)) {
            return $this->headerArr;
        }

        return isset($this->headerArr[$key]) ? $this->headerArr[$key] : $default;
    }

    /**
     * getBody 获取协议体
     * @param  string  $key
     * @param $default
     * @return array|string
     */
    public function getBody(string $key = '', $default = '')
    {
        if (empty($key)) {
            return $this->bodyArr;
        }

        if (isset($this->bodyArr[$key])) {
            return $this->bodyArr[$key];
        }

        return $default;
    }

    /**
     * getStatus 获取协议返回状态
     * @return int
     */
    public function getStatus()
    {
        return $this->status;
    }

    /**
     * getMsgHexId 获取十六进制的msg id
     * @return mixed
     */
    public function getMsgHexId()
    {
        return $this->msgHexId;
    }

    /**
     * setErrorStatus 设置错误
     * @param  int  $status
     */
    public function setErrorStatus(int $status)
    {
        $this->status = $status;

        $this->error = $this->errorList[$this->commandId][$status];
    }

    public function handleSubmit()
    {
        //获取msgid二进制字符串
        [$this->msgIdDecArr, $hexArr] = self::generateMsgIdArr($this->getBody('Msg_src'));

        $this->msgHexId = implode('', $hexArr);

        return true;
    }

    /**
     * handle 处理协议
     * @return bool
     * @throws Exception
     */
    public function handle($fd)
    {
        switch ($this->commandId) {
            case self::CMPP_CONNECT:
                //客户端提交的连接请求
                return $this->handleConnect($fd);
            case self::CMPP_SUBMIT:
                //客户端提交的发送连接请求
                return $this->handleSubmit();
            case self::CMPP_DELIVER_RESP:
                //客户端返回上行包
                return true;
            case self::CMPP_TERMINATE:
                //客户端提交的断开连接请求
                return $this->handleTerminate();
            case self::CMPP_ACTIVE_TEST:
                //客户段提交的探活请求
                return $this->handleActiveTest();
            case self::CMPP_ACTIVE_TEST_RESP:
                //客户端发返回的探活Resp包,不做处理
                return true;
            default:
                throw new Exception('协议头结构错误');
                break;
        }
    }

    /**
     * handleConnect 处理连接
     * @return bool
     * @throws Exception
     */
    public function handleConnect($fd)
    {
        $spId = trim($this->getBody('Source_Addr'));

        Cmpp2::$spid2fd[$spId][] = $fd;

        Cmpp2::$fd2spid[$fd] = $spId;

        //获取用户约定好的密码，用于cmpp2 auth校验和打包回执
        $sharedSecret = $this->getBody('password');

        //将传输过来的二进制数据转换成16进制，用于cmpp2 auth校验和打包回执
        $authenticatorSource = bin2hex($this->getBody('AuthenticatorSource'));

        //二进制转换会丢失首部的0 需要补上
        $timeStamp = str_pad((string) $this->getBody('Timestamp'), 10, '0', STR_PAD_LEFT);

        //打包响应
        $this->packageConnectResp($authenticatorSource, $sharedSecret);

        return true;
    }

    /**
     * handleDeliverResp
     * @return bool
     */
    public function handleDeliverResp()
    {
//        $submitMsgId = dechex($this->getBody('Msg_Id'));
//
//        $submitMsgId = str_pad($submitMsgId, 16, '0', STR_PAD_LEFT);
//
        return true;
    }

    /**
     * handleTerminate 处理客户端的断开连接请求
     * @return bool
     */
    public function handleTerminate()
    {
        $this->packageTerminateResp();

        return true;
    }

    /**
     * handleActiveTest 处理客户端探活
     * @return bool
     */
    public function handleActiveTest()
    {
        //如果客户端主动发送探活则重置探活失败次数和更新探活时间，避免服务端再次发送探活

        $this->packageActiveTestResp();

        return true;
    }

    /**
     * packageStructureErrResp 打包结构体错误的响应
     */
    public function packageStructureErrResp()
    {
        switch ($this->commandId) {
            case self::CMPP_CONNECT:
                $this->packageConnectResp('', '');
                break;
            case self::CMPP_SUBMIT:
                $this->getNoUserSessMsgIdArr();

                $this->packageSubmitResp();
                break;
        }
    }

    /**
     * packageConnectResp 打包连接响应数据
     * @param $authenticatorSource
     * @param $sharedSecret
     */
    public function packageConnectResp($authenticatorSource, $sharedSecret)
    {
        //生成AuthenticatorISMG
        $authenticatorISMG = md5($this->getStatus() . $authenticatorSource . $sharedSecret, true);

        //生成响应体
        $respBodyBinary = pack('Ca16C', $this->getStatus(), $authenticatorISMG, 0x20);

        //生成响应头
        $respHeaderBinary = pack('NNN', strlen($respBodyBinary) + 12, self::CMPP_CONNECT_RESP, $this->getHeader('Sequence_Id') ?? '');

        $this->response = $respHeaderBinary . $respBodyBinary;
    }

    /**
     * packageSubmitResp
     * TODO 放到扩展里面做提高性能
     */
    public function packageSubmitResp()
    {
        $this->msgIdDecArr[] = $this->getStatus();
        //生成响应体
        $respBodyBinary = pack('C8C', ...$this->msgIdDecArr);

        //生成响应头
        $respHeaderBinary = pack('NNN', strlen($respBodyBinary) + 12, self::CMPP_SUBMIT_RESP, $this->getHeader('Sequence_Id') ?? '');

        $this->response = $respHeaderBinary . $respBodyBinary;
    }

    /**
     * packageTerminateResp 设置erminate响应数据
     */
    public function packageTerminateResp()
    {
        $this->response = pack('NNN', 12, self::CMPP_TERMINATE_RESP, $this->getHeader('Sequence_Id'));
    }

    /**
     * packageActiveTestResp 设置探活响应数据
     */
    public function packageActiveTestResp()
    {
        $respBodyBinary = pack('c', '0');

        $respHeaderBinary = pack('NNN', strlen($respBodyBinary) + 12, self::CMPP_ACTIVE_TEST_RESP, $this->getHeader('Sequence_Id'));

        $this->response = $respHeaderBinary . $respBodyBinary;
    }

    /**
     * generateMsgIdArr 生成msgid二进制字符串，转换成八位的数组
     * @param $srcNumber
     * @return array
     * TODO 放到扩展里面做提高性能
     */
    public static function generateMsgIdArr($srcNumber)
    {
        $msgSequenceId = self::generateMsgSequenceId();

        $dateStr = date('mdHis');

        $dateArr = str_split($dateStr, 2);

        //转换成二进制字符串
        $msgIdStr = sprintf("%04s%05s%05s%06s%06s%022s%016s", decbin($dateArr[0]), decbin($dateArr[1]), decbin($dateArr[2]), decbin($dateArr[3]), decbin($dateArr[4]), decbin($srcNumber), decbin($msgSequenceId));


        //分割字符串为8位一组
        $msgIdBinary = str_split($msgIdStr, 8);

        //将二进制转换为十进制因为pack只认字符串10进制数为十进制数
        $decArr = []; //十进制
        $hexArr = []; //十六进制
        foreach ($msgIdBinary as $binary) {
            $dec = bindec($binary);
            $decArr[] = $dec;
            $hexArr[] = str_pad(dechex($dec), 2, '0', STR_PAD_LEFT);
        }

        return [$decArr, $hexArr];
    }

    /**
     * parseMsgIdToDecArr 将十六进制的msgId转换成十进制的八份数据
     * @param $msgId
     * @return mixed
     */
    public static function parseMsgIdToDecArr($msgId)
    {
        //msgId是个十六进制的16字节数据,将16字节按2字节一份分成八份，如果按照16进制算其实两份才是一个字节
        $hexMsgIdArr = str_split($msgId, 2);

        $msgDecArr = [];

        foreach ($hexMsgIdArr as $byte) {
            $msgDecArr[] = hexdec($byte);
        }

        return $msgDecArr;
    }

    /**
     * generateMsgSequenceId 获取msg的sequence id
     * @return int
     */
    protected static function generateMsgSequenceId()
    {
        $sequenceId = Cmpp2::$atomit->add();
        return $sequenceId % 65535;
    }

    /**
     * generateProSequenceId 生成协议的sequence id
     * @return int
     */
    protected static function generateProSequenceId()
    {
        $sequenceId = Cmpp2::$atomit->add();
        return $sequenceId % 65535;
    }

    /**
     * getNoUserSessMsgIdArr 获取没有sess的情况下的msgid数组
     */
    public function getNoUserSessMsgIdArr()
    {
        //获取msgid二进制字符串
        [$this->msgIdDecArr, $hexArr] = self::generateMsgIdArr('');

        $this->msgHexId = implode('', $hexArr);
    }

    /**
     * sendTerminate 发送拆除连接信息
     * @param $fd
     * @return mixed
     */
    public static function sendTerminate($fd)
    {
        //获取序列ID
        $sequenceId = self::generateProSequenceId();

        //拼装头信息
        $pack = pack('NNN', 12, self::CMPP_TERMINATE, $sequenceId);

        //发送拆除连接信息
        return Cmpp2::$server->send($fd, $pack);
    }

    /**
     * sendActiveTest
     * @param $fd
     * @return mixed
     */
    public static function sendActiveTest($fd)
    {
        //获取序列ID
        $sequenceId = self::generateProSequenceId();

        //拼装头信息
        $pack = pack('NNN', 12, self::CMPP_ACTIVE_TEST, $sequenceId);

        //发送链路检测
        return Cmpp2::$server->send($fd, $pack);
    }

    /**
     * prepareReportBinary
     * @param $upStreamMsgId
     * @param $spId
     * @param $userExt
     * @param $mobile
     * @param $stat
     * @param  string  $submitTime
     * @param  string  $doneTime
     * @return array
     */
    public static function prepareReportBinary($upStreamMsgIdDecArr, $userExt, $mobile, $stat, $submitTime = '', $doneTime = '')
    {

        $upStreamMsgIdBinary = @pack('C8', ...$upStreamMsgIdDecArr);

        $time = date('ymdHi');

        $submitTime = $submitTime ?: $time;

        $doneTime = $doneTime ?: $time;

        $msgContentBinary = $upStreamMsgIdBinary . pack('a7a10a10a21N', $stat, $submitTime, $doneTime, $mobile, 0);

        $msgLength = 57;

        $bodyBinary = $upStreamMsgIdBinary . pack('a21a10CCCa21CCa' . $msgLength . 'a8', $userExt, '', 0, 0, 0, $mobile, 1, $msgLength, $msgContentBinary, '');
        $headerBinary = pack('NNN', 12 + strlen($bodyBinary), self::CMPP_DELIVER, self::generateProSequenceId());

        return $headerBinary . $bodyBinary;
    }

    public static function prepareDeliverBinary($spId, $userExt, $mobile, $msgContent, $msgFmt)
    {
        [$msgDecArr, $msgHexArr] = self::generateMsgIdArr($spId);

        $msgIdBinary = @pack('C8', ...$msgDecArr);

        $msgLength = strlen($msgContent);

        //msg长度最长255
        if ($msgLength > 255) {
            $msgContent = substr($msgContent, 0, 255);

            $msgLength = 255;
        }

        $bodyBinary = $msgIdBinary . pack('a21a10CCCa21CCa' . $msgLength . 'a8', $userExt, '', 0, 0, $msgFmt, $mobile, 0, $msgLength, $msgContent, ''
        );

        $headerBinary = pack('NNN', 12 + strlen($bodyBinary), self::CMPP_DELIVER, self::generateProSequenceId());

        return [implode('', $msgHexArr), $headerBinary . $bodyBinary];
    }

    public static function respAndClose($server, $fd, Cmpp2 $protocol)
    {
        //如果fd存在，并且不是接收的响应包则反回
        if ($server->exist($fd) && !in_array($protocol->getCommandId(), [
                    Cmpp2::CMPP_ACTIVE_TEST_RESP,
                    Cmpp2::CMPP_TERMINATE_RESP,
                    Cmpp2::CMPP_DELIVER_RESP
                ])) {
            //发送响应
            $server->send($fd, $protocol->getResponse());

            //如果需要关闭连接或是客户端的terminate操作则关闭连接
            if ($protocol->getNeedCloseFd() || $protocol->getCommandId() === Cmpp2::CMPP_TERMINATE) {
                if (!$server->close($fd)) {
                    Log::warning('【协议出错】客户端fd：' . $fd . '关闭失败，请确认');
                }
            }
        }
    }

    public static function getReportRedis()
    {
        global $portStr;
        return "channel_suc_and_fail" . $portStr;
    }

    public static function getDeliverRedis()
    {
        global $portStr;
        return "channel_deliver" . $portStr;
    }

    public static function popSucAndFailRedis()
    {
        go(function() {
            $redis = new redis();
            $redis->connect("127.0.0.1", 6379);
            while (1) {
                $time = (string) time();

                //获取expireAt 大于0小于等于当前时间的
                $seris = $redis->zRangeByScore(self::getReportRedis(), '0', $time);

                if (empty($seris)) {
                    Coroutine::sleep(1);
                    continue;
                }

                $redis->zRemRangeByScore(self::getReportRedis(), '0', $time);

                foreach ($seris as $seri) {
                    $unseri = unserialize($seri);
                    $expireAt = $unseri[0];
                    $submitTime = $unseri[1];
                    $isLong = $unseri[2];
                    $msgid = $unseri[3];
                    $src_id = $unseri[4];
                    $mobile = $unseri[5];
                    $report = $unseri[6];
                    $spid = $unseri[7];
                    $doneTime = date('ymdHi', $expireAt);

                    $fds = Cmpp2::$spid2fd[$spid] ?? [];

                    if (empty($fds)) {
                        continue;
                    }

                    $fd = $fds[array_rand($fds, 1)];

                    if ($report == "DELIVRD") {
                        Cmpp2::deliverDispatchLogic($spid, $src_id, $mobile, $fd, $redis);
                    }

                    if ($isLong) {
                        foreach ($msgid as $midArr) {
                            $bin = Cmpp2::prepareReportBinary($midArr, $src_id, $mobile, $report, $submitTime, $doneTime);
                            Cmpp2::$server->send($fd, $bin);
                        }
                    } else {
                        $bin = Cmpp2::prepareReportBinary($msgid, $src_id, $mobile, $report, $submitTime, $doneTime);
                        Cmpp2::$server->send($fd, $bin);
                    }
                }
            }
        });
    }

    public static function popDeliverRedis()
    {
        go(function() {
            $redis = new redis();
            $redis->connect("127.0.0.1", 6379);
            while (1) {
                $time = (string) time();

                //获取expireAt 大于0小于等于当前时间的
                $seris = $redis->zRangeByScore(self::getDeliverRedis(), '0', $time);

                if (empty($seris)) {
                    Coroutine::sleep(1);
                    continue;
                }

                $redis->zRemRangeByScore(self::getDeliverRedis(), '0', $time);

                foreach ($seris as $seri) {
                    $unseri = unserialize($seri);
                    $spId = $unseri[0];
                    $srcId = $unseri[1];
                    $mobile = $unseri[2];
//                    $fd = $unseri[3];
                    $gotDeliver = $unseri[4];

                    $fds = Cmpp2::$spid2fd[$spId] ?? [];

                    if (empty($fds)) {
                        continue;
                    }

                    $fd = $fds[array_rand($fds, 1)];

                    //检测是否包含中文
                    $haveChinese = (strlen($gotDeliver) == mb_strlen($gotDeliver, 'UTF-8')) ? false : true;
                    if ($haveChinese) {
                        [$msgId, $binary] = Cmpp2::prepareDeliverBinary($spId, $srcId, $mobile, iconv("UTF-8", "gbk//TRANSLIT", $gotDeliver), 15);
                    } else {
                        [$msgId, $binary] = Cmpp2::prepareDeliverBinary($spId, $srcId, $mobile, mb_convert_encoding($gotDeliver, 'UTF-16'), 8);
                    }

                    Cmpp2::$server->send($fd, $binary);
                }
            }
        });
    }

    public static function initPool()
    {
        Cmpp2::$channel_suc_and_fail = new Coroutine\Channel(1000);

        //通过channel转到redis   避免引入连接池
        go(function() {
            $redis = new redis();
            $redis->connect("127.0.0.1", 6379);
            while (1) {
                $seri = Cmpp2::$channel_suc_and_fail->pop(-1);
                $unser = unserialize($seri);
                $expireAt = $unser[0]; //doneTime
                $redis->zAdd(self::getReportRedis(), $expireAt, $seri);
            }
        });
    }

    public static function getReport()
    {
        global $confReport2rate;

        $rand = (rand(0, 1000)) / 10;

        $rateSum = 0;

        $gotReport = 'spec_UNKNOWN';

        foreach ($confReport2rate as $report => $rate) {
            $rateSum += $rate;
            if ($rand <= $rateSum) {
                $gotReport = $report;
                break;
            }
        }

        return $gotReport;
    }

    public static function getDelay()
    {
        global $confReportDelay2rate;

        $rand = (rand(0, 1000)) / 10;

        $rateSum = 0;

        $gotDelay = "1-2"; //??

        foreach ($confReportDelay2rate as $delay => $rate) {
            $rateSum += $rate;
            if ($rand <= $rateSum) {
                $gotDelay = $delay;
                break;
            }
        }

        return $gotDelay;
    }

    public static function getDeliver()
    {
        global $confDeliver2rate;

        $rand = (rand(0, 1000)) / 10;

        $rateSum = 0;

        $gotDeliver = '';

        foreach ($confDeliver2rate as $deliver => $rate) {
            $rateSum += $rate;
            if ($rand <= $rateSum) {
                $gotDeliver = $deliver;
                break;
            }
        }

        return $gotDeliver;
    }

    public static function getDeliverDelay()
    {
        global $confDeliverDelay2rate;

        $rand = (rand(0, 1000)) / 10;

        $rateSum = 0;

        $gotDelay = "1-2"; //??

        foreach ($confDeliverDelay2rate as $delay => $rate) {
            $rateSum += $rate;
            if ($rand <= $rateSum) {
                $gotDelay = $delay;
                break;
            }
        }

        return $gotDelay;
    }

    /*
     * 上行核心逻辑
     */

    public static function deliverDispatchLogic($spId, $srcId, $mobile, $fd, $redis)
    {
        global $confDeliverRate;

        if (empty($confDeliverRate)) {
            return false;
        }

        $rand = (rand(0, 1000)) / 10;
        if ($rand > $confDeliverRate) {//没命中上行
            return false;
        }

        $gotDeliver = self::getDeliver();

        $gotDelay = self::getDeliverDelay();

        $reportSleepEx = explode("-", $gotDelay);

        $reportSleep = rand($reportSleepEx[0], $reportSleepEx[1]);

        $expireAt = time() + $reportSleep;

        $sequenceId = Cmpp2::$atomitDeliver->add();
        
        $sequenceId = $sequenceId % 1000000000; //防止值重复

        $redis->zAdd(self::getDeliverRedis(), $expireAt, serialize([$spId, $srcId, $mobile, $fd, $gotDeliver, $sequenceId]));
    }

    /*
     * 核心逻辑，长信收完后此方法决定状态走向
     */

    public static function dispatchLogic($isLong, $msgKey, $mobile, $protocol, $fd, $server, $srcId)
    {

        $gotReport = self::getReport();

        $gotDelay = self::getDelay();

        $submitTime = date('ymdHi');

        //获取SPID
        if (!isset(Cmpp2::$fd2spid[$fd])) {
            unset(Cmpp2::$longMsg[$msgKey]);
            return false;
        } else {
            $spid = Cmpp2::$fd2spid[$fd];
        }

        if ($gotReport == "spec_UNKNOWN") {//未知
//            var_dump("hit unknown");
        } else {
            $reportSleepEx = explode("-", $gotDelay);

            $reportSleep = rand($reportSleepEx[0], $reportSleepEx[1]);

            $reportAt = time() + $reportSleep;
            if ($isLong) {
                Cmpp2::$channel_suc_and_fail->push(serialize([$reportAt, $submitTime, $isLong, Cmpp2::$longMsg[$msgKey]['msgid'], $srcId, $mobile, $gotReport, $spid]));
                unset(Cmpp2::$longMsg[$msgKey]);
            } else {
                Cmpp2::$channel_suc_and_fail->push(serialize([$reportAt, $submitTime, $isLong, $protocol->msgIdDecArr, $srcId, $mobile, $gotReport, $spid]));
            }
        }
    }

}

/*
 * 初始化swoole类
 * port填0 主服务获取一个系统随机可用端口
 */
$server = new Swoole\Server('0.0.0.0', 0, SWOOLE_BASE);

Cmpp2::$server = $server;

Cmpp2::$atomit = new Swoole\Atomic();

Cmpp2::$atomitDeliver = new Swoole\Atomic();

$server->set([
    'worker_num' => 1, //必须1个worker
    'enable_coroutine' => true,
    'max_coroutine' => 300000,
        ]
);

//监听连接进入事件
$server->on('Connect', function ($server, $fd) {
    echo "Client: Connect.\n";
});


//监听连接关闭事件
$server->on('Close', function ($server, $fd) {

    $spid = Cmpp2::$fd2spid[$fd] ?? 0;

    unset(Cmpp2::$fd2spid[$fd]);

    if (!empty($spid)) {
        $newFds = [];

        foreach (Cmpp2::$spid2fd[$spid] as $oldFd) {
            if ($oldFd != $fd) {
                $newFds[] = $oldFd;
            }
        }

        Cmpp2::$spid2fd[$spid] = $newFds;
    }

    echo "Client: Close. fd $fd, spid $spid\n";
});

$server->on('receive', function () {
    echo "Recv\n";
});

function realRecv($server, $fd, $from_id, $data)
{
    $protocol = new Cmpp2();

    $protocol->setBinary($data);

    //解析协议头
    $protocol->parseHeader();

    if ($protocol->getCommandId() === Cmpp2::CMPP_ACTIVE_TEST_RESP) {
        //如果是客户端反回的探活包回应则不做处理
        return;
    }

    //解析协议体成功了，执行后续操作
    if ($protocol->parseBody()) {
        $handleRes = $protocol->handle($fd);
    }

    if ($protocol->getCommandId() === Cmpp2::CMPP_SUBMIT && $handleRes) {
        $protocol->packageSubmitResp();
    }

    //发送submit resp同时确认是否关闭连接
    Cmpp2::respAndClose($server, $fd, $protocol);

    //发送report
    if ($server->exist($fd) && $protocol->getCommandId() === Cmpp2::CMPP_SUBMIT) {
        /*
         * 获取完整短信
         */
        $body = $protocol->getBody();
        $mobile = trim($body['Dest_terminal_Id']);

        $isLong = $body['TP_udhi'] == 1 ? true : false;
        if ($isLong) {
            $protocol->parseLongMsgHeader(substr($body['Msg_Content'], 0, 6));
            $msgKey = $fd . $mobile . $protocol->uniqueSmsFlag;
            if (!isset(Cmpp2::$longMsg[$msgKey])) {
                Cmpp2::$longMsg[$msgKey]['total'] = $protocol->smsTotal;
                Cmpp2::$longMsg[$msgKey]['num'] = 1;
            } else {
                Cmpp2::$longMsg[$msgKey]['num'] ++;
            }

            Cmpp2::$longMsg[$msgKey]['msgid'][$protocol->smsIndex] = $protocol->msgIdDecArr;

            if (Cmpp2::$longMsg[$msgKey]['num'] < Cmpp2::$longMsg[$msgKey]['total']) {
                //直接return 等待长信发完
                return;
            }
        } else {
            $msgKey = '';
        }

        /*
         * 分发状态
         */
        Cmpp2::dispatchLogic($isLong, $msgKey, $mobile, $protocol, $fd, $server, $body['Src_Id']);
    }
}

$portObj = [];
foreach ($portArr as $k => $value) {
    $portObj[$k] = $server->listen('0.0.0.0', $value, SWOOLE_SOCK_TCP);
    $portObj[$k]->set([
        'open_length_check' => true,
        'open_tcp_nodelay' => true,
        'package_length_type' => 'N',
        'package_length_offset' => 0,
        'package_body_offset' => 0,
    ]);
    $portObj[$k]->on('receive', 'realRecv');
}

$server->on('WorkerStart', function ($server) {
    Swoole\Runtime::enableCoroutine($flags = SWOOLE_HOOK_ALL);
    Cmpp2::initPool();
    Cmpp2::popSucAndFailRedis();
    Cmpp2::popDeliverRedis();
});



echo("\n\033[32mrealCMPP模拟器启动成功，监听端口 {$portStr}\033[0m\n\n");

//启动服务器
$server->start();
